Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][connector-v2] add tablestore source and sink #3309

Merged
merged 33 commits into from
Nov 15, 2022

Conversation

liugddx
Copy link
Member

@liugddx liugddx commented Nov 5, 2022

Purpose of this pull request

The tablestore source connector reuse JDBC connector.

Check list

@liugddx liugddx marked this pull request as draft November 5, 2022 07:56
@liugddx liugddx closed this Nov 5, 2022
@liugddx liugddx reopened this Nov 6, 2022
@liugddx
Copy link
Member Author

liugddx commented Nov 6, 2022

env {
  # You can set flink configuration here
  execution.parallelism = 1
  job.mode = "BATCH"
  #execution.checkpoint.interval = 10000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  Jdbc {
    url = "jdbc:ots:https://p00spwbz7y1w.cn-shanghai.ots.aliyuncs.com/p00spwbz7y1w"
    driver = "com.alicloud.openservices.tablestore.jdbc.OTSDriver"
    user = "xxxxxxxx"
    password = "xxxxx"
    query = "select * from table_test"
  }

  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
  # please go to https://seatunnel.apache.org/docs/category/source-v2
}

transform {


  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  # please go to https://seatunnel.apache.org/docs/category/transform
}

sink {
  Console {
    parallelism = 1
  }

  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
  # please go to https://seatunnel.apache.org/docs/category/sink-v2
}

@liugddx
Copy link
Member Author

liugddx commented Nov 7, 2022

This is the test data.
image

@liugddx
Copy link
Member Author

liugddx commented Nov 7, 2022

flink engine test result
image

@liugddx
Copy link
Member Author

liugddx commented Nov 7, 2022

spark engine test result.
image

@liugddx
Copy link
Member Author

liugddx commented Nov 9, 2022

sink test result:

  1. source data
    image
  2. flink result
    image
  3. spark result
    image

@liugddx
Copy link
Member Author

liugddx commented Nov 9, 2022

sink connector test conf

env {
  # You can set flink configuration here
  execution.parallelism = 1
  job.mode = "BATCH"
  #execution.checkpoint.interval = 10000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  Jdbc {
    url = "jdbc:ots:https://p00spwbz7y1w.cn-shanghai.ots.aliyuncs.com/p00spwbz7y1w"
    driver = "com.alicloud.openservices.tablestore.jdbc.OTSDriver"
    user = "xxx"
    password = "xxx"
    query = "select * from source"
  }


  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
  # please go to https://seatunnel.apache.org/docs/category/source-v2
}

transform {

  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  # please go to https://seatunnel.apache.org/docs/category/transform
}

sink {
  Tablestore {
    end_point = "https://p00spwbz7y1w.cn-shanghai.ots.aliyuncs.com"
    instance_name = "xxx"
    access_key_id = "xxx"
    access_key_secret = "xxx"
    table = "sink"
    primary_keys = ["pk_1","pk_2","pk_3"]
  }

@liugddx liugddx marked this pull request as ready for review November 9, 2022 14:14
@liugddx
Copy link
Member Author

liugddx commented Nov 9, 2022

@hailin0 @Hisoka-X @EricJoy2048 PTAL,thanks

@Hisoka-X Hisoka-X requested a review from ic4y November 10, 2022 02:50
@@ -139,7 +130,7 @@ private SeaTunnelRowType initTableField(Connection conn) {
ArrayList<String> fieldNames = new ArrayList<>();
try {
PreparedStatement ps = conn.prepareStatement(jdbcSourceOptions.getJdbcConnectionOptions().getQuery());
ResultSetMetaData resultSetMetaData = ps.getMetaData();
ResultSetMetaData resultSetMetaData = ps.executeQuery().getMetaData();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need add executeQuery

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executeQuery has a great impact on the performance of other databases, it is recommended to use JdbcDialect to achieve

initialize = true;
}

public synchronized void write(RowPutChange rowPutChange) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no synchronized needed

}
}

public synchronized void close() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-tablestore</artifactId>
<version>${project.version}</version>
<scope>provided</scope> </dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep the format

if (config.hasPath(TablestoreConfig.DEFAULT_BATCH_INTERVAL_MS)) {
this.batchIntervalMs = config.getInt(TablestoreConfig.DEFAULT_BATCH_INTERVAL_MS);
}
if (config.hasPath(TablestoreConfig.PRIMARY_KEYS)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already checked in checkAllExists

Copy link
Member

@EricJoy2048 EricJoy2048 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add OptionRule reference #3337.

- [Feature] Support DB2 JDBC Sink ([2410](https://github.com/apache/incubator-seatunnel/pull/2410))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add changed log


The primaryKeys of Tablestore.

### common options
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
### common options
### common options [ config ]

@liugddx
Copy link
Member Author

liugddx commented Nov 11, 2022

@EricJoy2048 @ic4y done. PTAL again,thanks.


@Override
public OptionRule optionRule() {
return OptionRule.builder().required(END_POINT, TABLE, INSTANCE_NAME, ACCESS_KEY_ID, ACCESS_KEY_SECRET, PRIMARY_KEYS, SeaTunnelSchema.SCHEMA).build();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lost the option options?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lost the option options?

What is the meaning of it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

liugddx and others added 9 commits November 14, 2022 16:34
…/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java

Co-authored-by: Eric <gaojun2048@gmail.com>
…/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java

Co-authored-by: Eric <gaojun2048@gmail.com>
…/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java

Co-authored-by: Eric <gaojun2048@gmail.com>
…/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java

Co-authored-by: Eric <gaojun2048@gmail.com>
…/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java

Co-authored-by: Eric <gaojun2048@gmail.com>
…/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java

Co-authored-by: Eric <gaojun2048@gmail.com>
…/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java

Co-authored-by: Eric <gaojun2048@gmail.com>
…/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java

Co-authored-by: Eric <gaojun2048@gmail.com>
Copy link
Contributor

@ic4y ic4y left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants